package com.gvsoft.communication.net;

import com.gvsoft.communication.common.IConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created with IntelliJ IDEA.
 * ProjectName:gvMsgRouting
 * Author: zhaoqiubo
 * Date: 15/9/1
 * Time: 下午3:00
 * Desc:网络监听器主线程
 */
public class ChannelListener extends Thread {

    private final static Logger logger = LogManager.getLogger("net");

    private Selector selector;
    private Dispatcher dispatcher;
    private boolean interrupt;
    IConfig cfg;
    /**
     * 存储正在连接的通道和NSocket
     */
    private Queue<Object[]> connectingChannel = new LinkedBlockingQueue<>();

    public ChannelListener(Dispatcher dispatcher, String name, IConfig cfg) throws IOException {
        super(name);
        this.dispatcher = dispatcher;
        this.selector = Selector.open();
        this.cfg = cfg;

    }

    public void run() {
        while (!interrupt) {
            try {
                //监听事件key
                selector.select();
                handleOtherClientConnect();
                //迭代一组事件key
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    SelectionKey key = keys.next();
                    try {
                        //删除Iterator中的当前key，避免重复处理
                        keys.remove();
                        if (key.isAcceptable()) {
                            handleAccept(key);
                        } else if (key.isConnectable()) {
                            handleConnect(key);
                        } else if (key.isReadable()) {
                            handleRead(key);
                        } else if (key.isWritable()) {
                            handleWrite(key);
                        }
                    } catch (Throwable t) {
                        logger.error("event error:" + t.getMessage());
                        if (key.channel() != null) {
                            NSocket nSocket = (NSocket) key.attachment();
                            try {
                                closeNSocket(nSocket, t);
                            } catch (IOException e) {
                                logger.error("closeNSocket Exception:" + e.getMessage());
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                if (selector.isOpen()) {
                    logger.error("selector error:" + t.getMessage());
                }
            }
        }
    }

    private void closeNSocket(NSocket nSocket, Throwable t) throws IOException {
//        t.printStackTrace();
        logger.info("<" + nSocket.getUserId() + ">'s Channel closing……");
        nSocket.getIoAdapter().onError(nSocket, t);
        nSocket.close();

    }

    /**
     * 处理新接入的客户端通道(服务端使用)
     *
     * @throws Exception
     */
    private void handleOtherClientConnect() throws Exception {

        Object[] objects;
        if ((objects = connectingChannel.poll()) != null) {
            SocketChannel channel = (SocketChannel) objects[0];
            NSocket nSocket = (NSocket) objects[1];
            nSocket.setKey(channel.register(this.selector, SelectionKey.OP_READ, nSocket));
            nSocket.setNew();

        }
    }

    /**
     * 处理accept事件(服务端使用)
     *
     * @param key 连接事件key
     */
    private void handleAccept(SelectionKey key) throws Exception {

        SocketChannel channel;
        //从客户端送来的key中获取ServerSocket通道
        //接收此ServerSocket通道中的Socket通道，accept是一个阻塞方法，一直到获取到连接才会继续
        channel = ((ServerSocketChannel) key.channel()).accept();
        //将此socket通道设置为非阻塞模式
        channel.configureBlocking(false);
        ChannelListener listener = dispatcher.getNextListener();
        Selector selector = listener.getSelector();
        NSocket nSocket = new NSocket(channel, systemTimeUtc(), dispatcher, cfg.getIoAdapter(), cfg.getSocketStatusHandle());

        //如果nextlistener就是当前的listener，则直接注册在当前的selector上面,
        // 如果不是当前的当前的线程，那么放入队列，让其他线程select()后，调用handleConnect()方法
        if (listener == this) {
            nSocket.setKey(channel.register(selector, SelectionKey.OP_READ, nSocket));
            nSocket.setNew();
        } else {
            Object[] objects = {channel, nSocket};
            listener.connectingChannel.offer(objects);
            listener.selector.wakeup();
        }

    }

    /**
     * 处理连接事件(客户端使用)
     *
     * @param key 连接事件
     * @throws IOException
     */
    private void handleConnect(SelectionKey key) throws IOException {
        NSocket nSocket = (NSocket) key.attachment();

        while (!((SocketChannel) key.channel()).finishConnect()&&!interrupt) {
        }
        dispatcher.connectLatchDown();
        nSocket.setConnected();
        key.interestOps(SelectionKey.OP_READ);
        selector.wakeup();

    }

    /**
     * 处理read事件
     *
     * @param key 读事件key
     * @throws IOException
     */
    private void handleRead(SelectionKey key) throws Exception {
        NSocket nSocket = (NSocket) key.attachment();
        nSocket.doRead();
    }

    /**
     * 处理wirte事件
     *
     * @param key 写入事件
     */
    private void handleWrite(SelectionKey key) throws Exception {
        NSocket nSocket = (NSocket) key.attachment();
        nSocket.doWrite();
    }

    @Override
    public void interrupt() {
        this.interrupt = true;
        super.interrupt();
        if (selector != null) {
            try {
                this.selector.close();
            } catch (IOException e) {
                logger.info(e.getMessage());
            }
        }
    }

    public Selector getSelector() {
        return selector;
    }

    //返回系统当前时间秒
    private int systemTimeUtc() {
        return (int) (System.currentTimeMillis() / 1000L);
    }


}
